SUGGESTION: KAFKA-8147: Add changelog topic configuration to KTable suppress#1
Open
vvcephei wants to merge 2 commits intohighluck:log-enablefrom
vvcephei:john-suggest-8029
Open
SUGGESTION: KAFKA-8147: Add changelog topic configuration to KTable suppress#1vvcephei wants to merge 2 commits intohighluck:log-enablefrom vvcephei:john-suggest-8029
vvcephei wants to merge 2 commits intohighluck:log-enablefrom
vvcephei:john-suggest-8029
Conversation
Author
|
Hi @highluck , this is just my suggestion as part of the review of apache#8029 (review) . It was just easier to express my thoughts in the form of code than words. Basically, I'd like to try and prevent making this class hierarchy complicated to reason about, and the way to do that is to keep all the state in one place (at the bottom-most implementation class). |
2ecd4f8 to
81ce750
Compare
highluck
pushed a commit
that referenced
this pull request
Mar 11, 2021
…to get end offsets and create topics (apache#9780) The existing `Kafka*BackingStore` classes used by Connect all use `KafkaBasedLog`, which needs to frequently get the end offsets for the internal topic to know whether they are caught up. `KafkaBasedLog` uses its consumer to get the end offsets and to consume the records from the topic. However, the Connect internal topics are often written very infrequently. This means that when the `KafkaBasedLog` used in the `Kafka*BackingStore` classes is already caught up and its last consumer poll is waiting for new records to appear, the call to the consumer to fetch end offsets will block until the consumer returns after a new record is written (unlikely) or the consumer’s `fetch.max.wait.ms` setting (defaults to 500ms) ends and the consumer returns no more records. IOW, the call to `KafkaBasedLog.readToEnd()` may block for some period of time even though it’s already caught up to the end. Instead, we want the `KafkaBasedLog.readToEnd()` to always return quickly when the log is already caught up. The best way to do this is to have the `KafkaBackingStore` use the admin client (rather than the consumer) to fetch end offsets for the internal topic. The consumer and the admin API both use the same `ListOffset` broker API, so the functionality is ultimately the same but we don't have to block for any ongoing consumer activity. Each Connect distributed runtime includes three instances of the `Kafka*BackingStore` classes, which means we have three instances of `KafkaBasedLog`. We don't want three instances of the admin client, and should have all three instances of the `KafkaBasedLog` share a single admin client instance. In fact, each `Kafka*BackingStore` instance currently creates, uses and closes an admin client instance when it checks and initializes that store's internal topic. If we change `Kafka*BackingStores` to share one admin client instance, we can change that initialization logic to also reuse the supplied admin client instance. The final challenge is that `KafkaBasedLog` has been used by projects outside of Apache Kafka. While `KafkaBasedLog` is definitely not in the public API for Connect, we can make these changes in ways that are backward compatible: create new constructors and deprecate the old constructors. Connect can be changed to only use the new constructors, and this will give time for any downstream users to make changes. These changes are implemented as follows: 1. Add a `KafkaBasedLog` constructor to accept in its parameters a supplier from which it can get an admin instance, and deprecate the old constructor. We need a supplier rather than just passing an instance because `KafkaBasedLog` is instantiated before Connect starts up, so we need to create the admin instance only when needed. At the same time, we'll change the existing init function parameter from a no-arg function to accept an admin instance as an argument, allowing that init function to reuse the shared admin instance used by the `KafkaBasedLog`. Note: if no admin supplier is provided (in deprecated constructor that is no longer used in AK), the consumer is still used to get latest offsets. 2. Add to the `Kafka*BackingStore` classes a new constructor with the same parameters but with an admin supplier, and deprecate the old constructor. When the classes instantiate its `KafkaBasedLog` instance, it would pass the admin supplier and pass an init function that takes an admin instance. 3. Create a new `SharedTopicAdmin` that lazily creates the `TopicAdmin` (and underlying Admin client) when required, and closes the admin objects when the `SharedTopicAdmin` is closed. 4. Modify the existing `TopicAdmin` (used only in Connect) to encapsulate the logic of fetching end offsets using the admin client, simplifying the logic in `KafkaBasedLog` mentioned in #1 above. Doing this also makes it easier to test that logic. 5. Change `ConnectDistributed` to create a `SharedTopicAdmin` instance (that is `AutoCloseable`) before creating the `Kafka*BackingStore` instances, passing the `SharedTopicAdmin` (which is an admin supplier) to all three `Kafka*BackingStore objects`, and finally always closing the `SharedTopicAdmin` upon termination. (Shutdown of the worker occurs outside of the `ConnectDistributed` code, so modify `DistributedHerder` to take in its constructor additional `AutoCloseable` objects that should be closed when the herder is closed, and then modify `ConnectDistributed` to pass the `SharedTopicAdmin` as one of those `AutoCloseable` instances.) 6. Change `MirrorMaker` similarly to `ConnectDistributed`. 7. Change existing unit tests to no longer use deprecated constructors. 8. Add unit tests for new functionality. Author: Randall Hauch <rhauch@gmail.com> Reviewer: Konstantine Karantasis <konstantine@confluent.io>
highluck
pushed a commit
that referenced
this pull request
Feb 6, 2024
This change introduces some basic clean up and refactoring for forthcoming commits related to the revised fetch code for the consumer threading refactor project. Reviewers: Christo Lolov <lolovc@amazon.com>, Jun Rao <junrao@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Suggestion for apache#8029
Committer Checklist (excluded from commit message)